1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18 import static org.junit.Assert.assertEquals;
19 import static org.junit.Assert.fail;
20 import static org.mockito.Matchers.any;
21 import static org.mockito.Matchers.anyInt;
22 import static org.mockito.Matchers.anyString;
23 import static org.mockito.Mockito.*;
24
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.List;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.concurrent.atomic.AtomicReference;
31
32 import org.junit.Before;
33 import org.junit.Test;
34 import org.mockito.MockitoAnnotations;
35
36 import rx.Observable;
37 import rx.Observer;
38 import rx.Producer;
39 import rx.Subscriber;
40 import rx.functions.Action2;
41 import rx.functions.Func0;
42 import rx.functions.Func1;
43 import rx.functions.Func2;
44 import rx.observers.TestSubscriber;
45
46 public class OperatorScanTest {
47
48 @Before
49 public void before() {
50 MockitoAnnotations.initMocks(this);
51 }
52
53 @Test
54 public void testScanIntegersWithInitialValue() {
55 @SuppressWarnings("unchecked")
56 Observer<String> observer = mock(Observer.class);
57
58 Observable<Integer> observable = Observable.just(1, 2, 3);
59
60 Observable<String> m = observable.scan("", new Func2<String, Integer, String>() {
61
62 @Override
63 public String call(String s, Integer n) {
64 return s + n.toString();
65 }
66
67 });
68 m.subscribe(observer);
69
70 verify(observer, never()).onError(any(Throwable.class));
71 verify(observer, times(1)).onNext("");
72 verify(observer, times(1)).onNext("1");
73 verify(observer, times(1)).onNext("12");
74 verify(observer, times(1)).onNext("123");
75 verify(observer, times(4)).onNext(anyString());
76 verify(observer, times(1)).onCompleted();
77 verify(observer, never()).onError(any(Throwable.class));
78 }
79
80 @Test
81 public void testScanIntegersWithoutInitialValue() {
82 @SuppressWarnings("unchecked")
83 Observer<Integer> observer = mock(Observer.class);
84
85 Observable<Integer> observable = Observable.just(1, 2, 3);
86
87 Observable<Integer> m = observable.scan(new Func2<Integer, Integer, Integer>() {
88
89 @Override
90 public Integer call(Integer t1, Integer t2) {
91 return t1 + t2;
92 }
93
94 });
95 m.subscribe(observer);
96
97 verify(observer, never()).onError(any(Throwable.class));
98 verify(observer, never()).onNext(0);
99 verify(observer, times(1)).onNext(1);
100 verify(observer, times(1)).onNext(3);
101 verify(observer, times(1)).onNext(6);
102 verify(observer, times(3)).onNext(anyInt());
103 verify(observer, times(1)).onCompleted();
104 verify(observer, never()).onError(any(Throwable.class));
105 }
106
107 @Test
108 public void testScanIntegersWithoutInitialValueAndOnlyOneValue() {
109 @SuppressWarnings("unchecked")
110 Observer<Integer> observer = mock(Observer.class);
111
112 Observable<Integer> observable = Observable.just(1);
113
114 Observable<Integer> m = observable.scan(new Func2<Integer, Integer, Integer>() {
115
116 @Override
117 public Integer call(Integer t1, Integer t2) {
118 return t1 + t2;
119 }
120
121 });
122 m.subscribe(observer);
123
124 verify(observer, never()).onError(any(Throwable.class));
125 verify(observer, never()).onNext(0);
126 verify(observer, times(1)).onNext(1);
127 verify(observer, times(1)).onNext(anyInt());
128 verify(observer, times(1)).onCompleted();
129 verify(observer, never()).onError(any(Throwable.class));
130 }
131
132 @Test
133 public void shouldNotEmitUntilAfterSubscription() {
134 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
135 Observable.range(1, 100).scan(0, new Func2<Integer, Integer, Integer>() {
136
137 @Override
138 public Integer call(Integer t1, Integer t2) {
139 return t1 + t2;
140 }
141
142 }).filter(new Func1<Integer, Boolean>() {
143
144 @Override
145 public Boolean call(Integer t1) {
146
147 return t1 > 0;
148 }
149
150 }).subscribe(ts);
151
152 assertEquals(100, ts.getOnNextEvents().size());
153 }
154
155 @Test
156 public void testBackpressureWithInitialValue() {
157 final AtomicInteger count = new AtomicInteger();
158 Observable.range(1, 100)
159 .scan(0, new Func2<Integer, Integer, Integer>() {
160
161 @Override
162 public Integer call(Integer t1, Integer t2) {
163 return t1 + t2;
164 }
165
166 })
167 .subscribe(new Subscriber<Integer>() {
168
169 @Override
170 public void onStart() {
171 request(10);
172 }
173
174 @Override
175 public void onCompleted() {
176
177 }
178
179 @Override
180 public void onError(Throwable e) {
181 fail(e.getMessage());
182 e.printStackTrace();
183 }
184
185 @Override
186 public void onNext(Integer t) {
187 count.incrementAndGet();
188 }
189
190 });
191
192
193 assertEquals(10, count.get());
194 }
195
196 @Test
197 public void testBackpressureWithoutInitialValue() {
198 final AtomicInteger count = new AtomicInteger();
199 Observable.range(1, 100)
200 .scan(new Func2<Integer, Integer, Integer>() {
201
202 @Override
203 public Integer call(Integer t1, Integer t2) {
204 return t1 + t2;
205 }
206
207 })
208 .subscribe(new Subscriber<Integer>() {
209
210 @Override
211 public void onStart() {
212 request(10);
213 }
214
215 @Override
216 public void onCompleted() {
217
218 }
219
220 @Override
221 public void onError(Throwable e) {
222 fail(e.getMessage());
223 e.printStackTrace();
224 }
225
226 @Override
227 public void onNext(Integer t) {
228 count.incrementAndGet();
229 }
230
231 });
232
233
234 assertEquals(10, count.get());
235 }
236
237 @Test
238 public void testNoBackpressureWithInitialValue() {
239 final AtomicInteger count = new AtomicInteger();
240 Observable.range(1, 100)
241 .scan(0, new Func2<Integer, Integer, Integer>() {
242
243 @Override
244 public Integer call(Integer t1, Integer t2) {
245 return t1 + t2;
246 }
247
248 })
249 .subscribe(new Subscriber<Integer>() {
250
251 @Override
252 public void onCompleted() {
253
254 }
255
256 @Override
257 public void onError(Throwable e) {
258 fail(e.getMessage());
259 e.printStackTrace();
260 }
261
262 @Override
263 public void onNext(Integer t) {
264 count.incrementAndGet();
265 }
266
267 });
268
269
270 assertEquals(101, count.get());
271 }
272
273
274
275
276 @Test
277 public void testSeedFactory() {
278 Observable<List<Integer>> o = Observable.range(1, 10)
279 .collect(new Func0<List<Integer>>() {
280
281 @Override
282 public List<Integer> call() {
283 return new ArrayList<Integer>();
284 }
285
286 }, new Action2<List<Integer>, Integer>() {
287
288 @Override
289 public void call(List<Integer> list, Integer t2) {
290 list.add(t2);
291 }
292
293 }).takeLast(1);
294
295 assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single());
296 assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single());
297 }
298
299 @Test
300 public void testScanWithRequestOne() {
301 Observable<Integer> o = Observable.just(1, 2).scan(0, new Func2<Integer, Integer, Integer>() {
302
303 @Override
304 public Integer call(Integer t1, Integer t2) {
305 return t1 + t2;
306 }
307
308 }).take(1);
309 TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
310 o.subscribe(subscriber);
311 subscriber.assertReceivedOnNext(Arrays.asList(0));
312 subscriber.assertTerminalEvent();
313 subscriber.assertNoErrors();
314 }
315
316 @Test
317 public void testScanShouldNotRequestZero() {
318 final AtomicReference<Producer> producer = new AtomicReference<Producer>();
319 Observable<Integer> o = Observable.create(new Observable.OnSubscribe<Integer>() {
320 @Override
321 public void call(final Subscriber<? super Integer> subscriber) {
322 Producer p = spy(new Producer() {
323
324 private AtomicBoolean requested = new AtomicBoolean(false);
325
326 @Override
327 public void request(long n) {
328 if (requested.compareAndSet(false, true)) {
329 subscriber.onNext(1);
330 } else {
331 subscriber.onCompleted();
332 }
333 }
334 });
335 producer.set(p);
336 subscriber.setProducer(p);
337 }
338 }).scan(100, new Func2<Integer, Integer, Integer>() {
339
340 @Override
341 public Integer call(Integer t1, Integer t2) {
342 return t1 + t2;
343 }
344
345 });
346
347 o.subscribe(new TestSubscriber<Integer>() {
348
349 @Override
350 public void onStart() {
351 request(1);
352 }
353
354 @Override
355 public void onNext(Integer integer) {
356 request(1);
357 }
358 });
359
360 verify(producer.get(), never()).request(0);
361 verify(producer.get(), times(2)).request(1);
362 }
363 }